Glue Data Qualityで品質検査ルールに不合格となったレコードを特定する
データアナリティクス事業本部インテグレーション部機械学習チーム・新納(にいの)です。
データのETLを担うサービスであるAWS Glueの機能群の一つ、Glue Data QualityではGUIベースの操作でデータの品質検査が可能です。データレイクやデータウェアハウスへのデータ投入前に、値の長さチェックや特定カラムがNULLでないかどうかなど、あらかじめ定義しておいたルールに合格しているかどうかを確認できます。
今回は、定義したルールに不合格となったレコードを特定する方法を検証してみました。
Glue Data Qualityについておさらい
ユーザーが定義したルールに従ってデータの品質検査をできるAWS Glueの機能のひとつです。AWSが開発したOSSであるDeequを使用しており、ルールの定義にはDQDL(Data Quality Definition Language)を利用します。
詳細は以下をご参照ください。
以下のエントリでは動画で実際の操作をご覧いただけます。
ルールに不合格となったレコードを特定する
シナリオ
今回は以下のような架空のユーザーデータを使って、name
がNULLのものはないか、credit_card
が全て16桁であるかチェックします。このルールに合致しないレコードをAmazon Athenaを使って特定します。
"name","name_hiragana","age","birthday","gender","email","phone_number","zipcode","address","company","credit_card","exp_date" "鈴木 賢司","すずき けんじ",72,"1952-06-11","M","kenjisuzuki@example.jp","04-3537-7908","337-1051","埼玉県川口市南鳩ヶ谷2-3-10","","4684847007028235","06/27" "小宮 真由子","こみや まゆこ",50,"1974-04-07","F","komiyamayuko@example.com","0567-38-4614","453-3750","愛知県春日井市高蔵寺町1-5-13","合同会社KANDICapitalManagement","3574517766884178","05/28" "堀 勇希","ほり ゆうき",55,"1968-11-13","M","yuuki_hori@example.net","0228-18-2254","988-2801","宮城県仙台市泉区松陵1-2-5","株式会社ゼウス","3540484972504721","02/28" "佐藤 涼","さとう りょう",31,"1992-12-02","M","ryou_sato@example.ne.jp","0823-74-1579","725-9090","広島県呉市本通2-3-13","有限会社ビクトリー","3564069950438371","05/26" "長浜 裕子","ながはま ゆうこ",24,"2000-05-10","F","nagahama_yuuko@example.org","0466-71-5467","233-9894","神奈川県横浜市旭区笹野台1-1-17","合同会社エバレッジ","372675929782430","10/25" "前田 雅也","まえだ まさや",45,"1979-04-05","M","maeda45@example.jp","03-1493-8474","118-4100","東京都台東区松が谷2-1-3ユーハウス106","","4378069700369912","01/26" "片山 隼","かたやま はやと",30,"1993-12-29","M","katayama_1229@example.org","03-1912-4421","123-8031","東京都江東区亀戸2-1-6","合同会社安清","371168643360445","02/25" ...(中略)... "蔵 めそ","くら めそ",30,"1993-11-30","M","kura_meso@example.co.jp","06-9636-4964","586-5852","大阪府大阪市中央区","","13584595506048971","09/28"
Glue Job作成
Data QualityはGlue Data CatalogとGlue Jobのどちらかから利用可能です。今回はGlue Job(Visual Job)のを利用し、S3に配置した上記のファイルを品質チェックしてその結果をS3バケットに出力します。
SourceにはAmazon S3を指定し、インプットデータのURIを指定します。
Evaluate Data Qualityノードを追加してデータ品質ルールを定義します。 Ruleset editorに以下の通りDQDLを定義します。
Rules = [ ColumnLength "credit_card" = 16, IsComplete "name" ]
Data quality transform outputではデータ品質検査の出力について定義します。
項目 | 内容 |
---|---|
Original data | 有効にすると元の入力データを出力する。 |
Add new columns to indicate data quality errors | Original dataを有効にした場合のみ選択可能。有効にすると以下のカラムを元の入力データに追加して出力する。 dataqualityrulespass:合格したルール dataqualityrulefail:不合格になったルール dataqualityrulesskip:スキップされたルール。ただし、次のルールはデータセットレベルで有効なため、不合格レコードを特定できない。 AggregateMatch・ColumnCount・ColumnExists・ColumnNamesMatchPattern・CustomSql・RowCount・RowCountMatch・StandardDeviation・Mean・ColumnCorrelation dataqualityevaluationresult:ルールの合否(Failed/Passed) |
Data quality results | 有効にするとルールと合否ステータスを出力する。 |
上記の設定は全て有効にして、最終的なアウトプットにData Qualityの結果カラムを追加されるように設定しました。
Data quality actionsでは、ルールが不合格だった場合にジョブをどうするか設定可能です。「Continue with job(ジョブを続行する)」、「Fail job after loading target data(ターゲットデータをロードしてからジョブを異常終了する)」のどちらかを選んでください。「Fail job without loading target data(ターゲットデータをロードせずジョブを異常終了する)」を選択するとData Qualityの結果カラムが出力されなくなります。今回はロードしてから異常終了する方を選択しました。
Original dataを有効にするとrowLevelOutcomesノードが、Data quality resultsを有効にするとruleOutcomesノードがそれぞれ自動で追加されます。この2つのノードには手動で設定を追加する必要はありません。
それぞれの結果をS3バケットに保存するため、Data targetとしてAmazon S3ノードを追加します。rowLevelOutcomesの出力結果を使って不合格レコードを特定するため、わかりやすいようにruleOutcomesと出力先のフォルダは分けておきましょう。
設定が完了したらSaveからジョブを保存し、ジョブを実行します。今回は「Fail job after loading target data(ターゲットデータをロードしてからジョブを異常終了する)」を選択しており、ルールに適合しないレコードがあるため、Error Category: UNCLASSIFIED_ERROR; AssertionError: The job failed due to failing DQ rules for node
というエラーでジョブ自体が異常終了します。
Data qualityタブを見ると、ColumnLength "credit_card" = 16
のルールが不合格だったことが確認できます。どのレコードが不合格だったのか特定してみましょう。
rowLevelOutcomesの出力結果を保存したファイルを確認すると、元の入力データにData Qualityの結果カラムが追加されていることが確認できます。このままでは確認しづらいため、Amazon Athenaからクエリできるように設定してみましょう。
Amazon Athenaで不合格レコードを特定
今回はGlue crawlerを使って、rowLevelOutcomesの出力先からテーブルを作成しました。テーブル定義は以下の通り。
CREATE EXTERNAL TABLE `user_jpoutput`( `name` string, `name_hiragana` string, `age` string, `birthday` string, `gender` string, `email` string, `phone_number` string, `zipcode` string, `address` string, `company` string, `credit_card` string, `exp_date` string, `dataqualityrulespass` array<string>, `dataqualityrulesfail` array<string>, `dataqualityrulesskip` array<string>, `dataqualityevaluationresult` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://<rowLevelOutcomesの出力先S3バケット>/' TBLPROPERTIES ( 'CrawlerSchemaDeserializerVersion'='1.0', 'CrawlerSchemaSerializerVersion'='1.0', 'UPDATED_BY_CRAWLER'='nino-dq-test', 'averageRecordSize'='197', 'classification'='parquet', 'compressionType'='none', 'objectCount'='2', 'recordCount'='2002', 'sizeKey'='221694', 'typeOfData'='file')
作成されたテーブルを見ると、dataqualityrulespass
、dataqualityrulefail
、dataqualityrulesskip
、dataqualityevaluationresult
の4つのData Qualityの結果カラムが追加されていることが確認できます。
WHERE "dataqualityevaluationresult" = 'Failed'
でクエリすればルールに不合格となったレコードを特定することができました。
最後に
データ品質ルールに不合格となったレコードを特定する方法をご紹介しました。この方法を使えば、データレイク・データウェアハウスへ投入する前に不具合のあるデータを検知し、不具合のあるレコードに関してすぐに対策を取ることができます。